package com.offcn.bigdata.streaming.p2

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * updateStateByKey
  *     按照key来更新状态操作
  *     统计截止到目前为止的某个key的状态(value)
  *  所以为了统计key截止到目前的状态，就得需要找一个地方来存储key的历史状态，我们可以成为前置状态，
  *  其次再和当前的批次中该key的状态进行累计，最后将累计的结果又更新会这个存储的地方。
  *  这个要存储的状态，在streaming中是通过checkpoint检查点来进行设置的。
  */
object _04UpdateStateByKeyOps {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
            .setAppName("_02Streaming2KafkaOps")
            .setMaster("local[*]")
        //            .set("spark.streaming.kafka.maxRatePerPartition", "10")
        val batchInterval = Seconds(2)
        val ssc = new StreamingContext(conf, batchInterval)

        ssc.checkpoint("file:/E:/data/minitor/chk")
        val lines = ssc.socketTextStream("bigdata01", 9999)

        val pairs = lines.flatMap(_.split("\\s+")).map((_, 1))

        val ret = pairs.updateStateByKey(updateFunc)


        ret.print


        ssc.start()
        ssc.awaitTermination()
    }

    /**
      * 状态更新函数
      * @param current      key所对应的当前批次的状态列表
      * @param history   key所对应的历史批次的状态，可能存在可能不存在
      * @return
      */
    def updateFunc(current: Seq[Int], history: Option[Int]): Option[Int] = {
        println(s"current: ${current}, history: ${history.getOrElse(0)}")
        val sum = current.sum + history.getOrElse(0)
        Option(sum)
    }
}
